package drds.plus.api;

import drds.plus.common.Constants;
import drds.plus.common.lifecycle.AbstractLifecycle;
import drds.plus.common.properties.ConnectionProperties;
import drds.plus.common.utils.thread.NamedThreadFactory;
import drds.plus.common.utils.thread.RightNowRunRunnableExecutorService;
import drds.plus.executor.Executor;
import drds.plus.util.GeneralUtil;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;

import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

@Slf4j
public class SessionManager extends AbstractLifecycle {

    public Logger log() {
        return log;
    }

    //
    @Setter
    @Getter
    private String applicationId = null;

    @Setter
    @Getter
    private Executor executor = null;
    @Setter
    @Getter
    private Map<String, Object> connectionProperties = new HashMap(2);
    @Setter
    @Getter
    private ConfigHolder configHolder;

    /**
     * 用于并行查询的线程池
     */
    @Setter
    @Getter
    private ExecutorService globalExecutorService = null;
    @Setter
    @Getter
    private boolean shareGlobalExecutor = false;
    @Setter
    @Getter
    private LinkedBlockingQueue<ExecutorService> executorServiceQueue = null;

    @Setter
    @Getter
    private String configMode;
    @Setter
    @Getter
    private Long maxActive;
    @Setter
    @Getter
    private AtomicLong activeCount = new AtomicLong(0);

    public void doInit() throws RuntimeException {

        this.executor = new Executor();
        executor.init();

        ConfigHolder configHolder = new ConfigHolder();
        configHolder.setApplicationId(applicationId);


        configHolder.setConnectionProperties(this.connectionProperties);


        // configHolder.init();

        this.configHolder = configHolder;

        /**
         * 如果不为每个连接都初始化，则为整个ds初始化一个线程池
         */
        boolean everyConnectionPool = GeneralUtil.getExtraCmdBoolean(this.getConnectionProperties(), ConnectionProperties.INIT_CONCURRENT_POOL_EVERY_CONNECTION, true);
        maxActive = GeneralUtil.getExtraCmdLong(this.getConnectionProperties(), ConnectionProperties.MAX_CONCURRENT_THREAD_SIZE, Constants.MAX_CONCURRENT_THREAD_SIZE);
        if (everyConnectionPool) {
            executorServiceQueue = new LinkedBlockingQueue<ExecutorService>();
        } else if (globalExecutorService == null) {
            // 全局共享线程池
            Object poolSizeObject = GeneralUtil.getExtraCmdString(this.getConnectionProperties(), ConnectionProperties.CONCURRENT_THREAD_SIZE);
            if (poolSizeObject == null) {
                throw new RuntimeException("如果线程池为整个datasource共用，请使用CONCURRENT_THREAD_SIZE指定线程池大小");
            }
            int poolSize = Integer.valueOf(poolSizeObject.toString());
            // 默认queue队列为poolSize的两倍，超过queue大小后使用当前线程
            globalExecutorService = createThreadPool(poolSize, false);
        }
    }

    public SessionImpl getConnection() throws SQLException {
        return new SessionImpl(this);
    }

    public ExecutorService borrowExecutorService() {
        if (globalExecutorService != null) {
            return globalExecutorService;
        } else {
            ExecutorService executorService = executorServiceQueue.poll();
            if (executorService == null) {
                Object poolSizeObject = GeneralUtil.getExtraCmdString(this.getConnectionProperties(), ConnectionProperties.CONCURRENT_THREAD_SIZE);
                int poolSize = 0;
                if (poolSizeObject != null) {
                    poolSize = Integer.valueOf(poolSizeObject.toString());
                } else {
                    poolSize = Constants.DEFAULT_CONCURRENT_THREAD_SIZE;
                }
                if (activeCount.addAndGet(poolSize) <= maxActive) {
                    executorService = createThreadPool(poolSize, false);
                } else {
                    executorService = createThreadPool(poolSize, true);
                }
            }
            if (executorService.isShutdown()) {
                return borrowExecutorService();
            } else {
                return executorService;
            }
        }
    }

    private ExecutorService createThreadPool(int poolSize, boolean rightNowRun) {
        if (rightNowRun) {
            return new RightNowRunRunnableExecutorService();
        } else {
            return new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(poolSize * 2), new NamedThreadFactory("concurrent_query_executor", true), new ThreadPoolExecutor.CallerRunsPolicy());
        }
    }

    public void releaseExecutorService(ExecutorService executorService) {
        if (executorService != null && executorService != globalExecutorService) {
            executorServiceQueue.offer(executorService);// 放回队列中
        }
    }

    public void doDestroy() throws RuntimeException {

        if (!shareGlobalExecutor && globalExecutorService != null) {
            globalExecutorService.shutdownNow();
        }
        if (executorServiceQueue != null) {
            for (ExecutorService executorService : executorServiceQueue) {
                executorService.shutdownNow();
            }
            executorServiceQueue.clear();
        }
        if (configHolder != null) {
            // configHolder.destroy();
        }
        if (executor != null && executor.isInited()) {
            executor.destroy();
        }
    }


    public void setGlobalExecutorService(ExecutorService globalExecutorService) {
        this.globalExecutorService = globalExecutorService;
        this.shareGlobalExecutor = true;
    }

}
